Skip to content

Rewrite the parquet output adapter manager#712

Merged
arhamchopra merged 13 commits into
mainfrom
ac/parquet_output_adapter
Jun 29, 2026
Merged

Rewrite the parquet output adapter manager#712
arhamchopra merged 13 commits into
mainfrom
ac/parquet_output_adapter

Conversation

@arhamchopra

@arhamchopra arhamchopra commented Jun 12, 2026

Copy link
Copy Markdown
Collaborator

Rewrite the parquet output adapter for RecordBatch-based writing

Replaces the old per-format C++ file-writer hierarchy (FileWriterWrapper / ParquetFileWriterWrapper / ArrowIPCFileWriterWrapper / FileWriterWrapperContainer) with a RecordBatch sink architecture: ParquetWriter builds Arrow RecordBatches and hands them to a single concrete C++ RecordBatchSink that writes parquet files, Arrow IPC streams, and split-column directories. This mirrors the input-side rewrite in #704 and shares the same Arrow type machinery.

Motivation

The old output path had:

  • A per-format writer-wrapper class hierarchy plus a FileWriterWrapperContainer to fan out to split-column files
  • A separate DialectGenericListWriterInterface for list columns, duplicating type dispatch already implemented in the shared Arrow layer
  • Per-type column-builder code (StructColumnArrayBuilder) that re-implemented nested-struct serialization the Arrow nodes already do
  • A registered-but-unreachable parquet_dict_basket_output_adapter (dead since basket output goes through the parquet_dict_basket_writer node)

The new implementation:

  • Builds one arrow::RecordBatch per batch_size rows and writes it through a single RecordBatchSink, so the file backend is one self-contained component
  • Serializes every column through the shared Arrow type machinery — the same per-type conversion the struct_to_record_batches Arrow node and the input adapter use: struct columns go through the shared ArrowFieldWriter, and scalar columns append values directly through an ArrowScalarColumnWriter<T> kernel that the field writer is itself built on
  • Keeps all file I/O in C++ (no per-batch Python / Arrow C Data Interface hop on the write path)
  • Bakes file- and column-level metadata into the Arrow schema in C++, preserved identically in single-file and split-column modes

Architecture

ParquetOutputAdapter (per published column / struct / list)
  ├→ ScalarColumnArrayBuilder<T>  — appends the ticked value directly (ArrowScalarColumnWriter<T> kernel)
  ├→ ArrowBackedArrayBuilder      — reads a field from a published struct (shared ArrowFieldWriter)
  └→ ListColumnArrayBuilder       — list / numpy-array columns
ParquetWriter (EndCycleListener)
  ├→ accumulates rows, builds arrow::RecordBatch every batch_size rows
  ├→ bakes file_metadata / column_metadata into the schema in start()
  └→ drives the sink: onStart(schema) / onFileChange(path) / onBatch(rb) / onStop()
RecordBatchSink                   — one concrete class, writes files entirely in C++:
        single   : one .parquet / .arrow file with all columns
        split    : a directory, one file per column
        rotation : onFileChange closes the current file and opens the next

RecordBatchSink (new, RecordBatchSink.cpp) is a single concrete class with ordinary member functions (onStart/onBatch/onFileChange/onStop) and member state — it owns overwrite checks, parent-directory creation, file rotation, compression, and the optional file_visitor. onFileChange returns whether a file is open after the call, so the writer tracks open/closed state directly. Compression is resolved through Arrow's own arrow::util::Codec API rather than a hardcoded name map.

ArrowScalarColumnWriter<T> (new, ArrowScalarWriter.h) is the single home of the per-type Arrow-builder choice and value conversion for scalar/temporal/string/enum types. It is shared two ways: the scalar output column (ScalarColumnArrayBuilder<T>) appends ticked values to it directly, and the struct field writer (ScalarFieldWriter<T> in the shared ArrowFieldWriter) reads a struct field then appends through the same kernel. csp's row-at-a-time "may-not-tick → null" model is handled by appending value-or-null per row; no synthetic scratch struct is needed for scalar columns.

ArrowBackedArrayBuilder (new) bridges published struct columns onto the shared ArrowFieldWriter: each row it is handed the source Struct* and the field writer reads the field (or appends null if the struct did not tick).

ParquetOutputAdapterManager is simplified to orchestrate the writer + per-basket dict writers and wire the sink (and a per-basket sink factory) at construction.

What's removed

  • FileWriterWrapper / ParquetFileWriterWrapper / ArrowIPCFileWriterWrapper / FileWriterWrapperContainer — the old per-format C++ file-writer hierarchy
  • DialectGenericListWriterInterface — list columns now go through ListColumnArrayBuilder
  • StructColumnArrayBuilder — nested-struct columns are written by the shared ArrowFieldWriter::NestedStructWriter
  • The per-column scratch struct — scalar columns no longer fabricate a one-field Struct to reuse the struct serializer; they append values directly through ArrowScalarColumnWriter<T>
  • parquet_dict_basket_output_adapter — registered but unreachable dead adapter
  • A large slice of ParquetOutputAdapter.cpp / ArrowSingleColumnArrayBuilder.h per-type boilerplate, folded into the shared Arrow writer

Bug fixes / hardening

Surfaced by review of the new sink:

  • mkdir("") on a bare filename — writing to a relative path with no directory component (e.g. "out.parquet") no longer fails with Invalid argument; the empty dirname is guarded.
  • Close-path exception safety — the current writer is reset before the user file_visitor runs (no double-close if it throws), and the output stream / all split sub-writers are always closed even if one close fails (no leaked file descriptors).
  • Compression resolution via Arrow — codecs are resolved through arrow::util::Codec::GetCompressionType + IsAvailable (case-insensitive, tracks whatever the Arrow build supports, clear error otherwise) instead of a hardcoded lowercase map.
  • Explicit writeTimestampColumn honored — an explicitly requested timestamp column is no longer silently downgraded.
  • Single-file + dict basket fails fast — publishing a dict basket without split_columns_to_files=True now raises a clear error instead of a low-level IOError.
  • FileExistsError — writing over an existing file with allow_overwrite=False raises Python FileExistsError.
  • DIALECT_GENERIC struct field raises — an object-typed generic struct field errors at construction instead of being silently dropped from the output.
  • Removed a dead manager-level index sink and made adapter-manager stop() flush/close all writers before destroying any of them.

API compatibility

The public Python API (ParquetWriter, ParquetOutputConfig, publish, publish_struct, publish_dict_basket, filename_provider, file_visitor, file_metadata / column_metadata) is unchanged. csp/tests/adapters/test_parquet_output.py (77 tests, covering all scalar/struct/list/numpy types, batch-size→row-group counts, compression codecs, Arrow IPC, split-column, rotation, dict baskets, metadata, nested-struct schema shape, overwrite/FileExistsError) and the existing test_parquet.py suite pass.

ParquetWriter builds RecordBatches and hands them to a pluggable
RecordBatchSink (onStart/onBatch/onFileChange/onStop). Removes the old C++
file-writer hierarchy and unifies output conversion via visitCspValueType.

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Remove dead StructColumnArrayBuilder and parquet_dict_basket_output_adapter,
de-virtualize scheduleEndCycleEvent, propagate file metadata to per-column
files in split mode, and expand output tests.

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Closure-based C++ sink writes parquet/IPC/split-column files directly,
removing the per-batch C++<->Python hop. Adds FileExistsError.

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
From the multi-model review:
- delete the now-dead Python sink (_parquet_rb_writer.py, rb_sink wiring,
  TestOutputSinkDirect) since file I/O is all C++ now
- RecordBatchFileSink: guard mkdir(""), fix close-path exception safety,
  resolve compression via Arrow's Codec API (case-insensitive)
- honor an explicit writeTimestampColumn
- fail fast on single-file + dict basket
- add regression tests

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Remove the unused manager-level m_indexSink/setIndexSink (per-basket index
sinks are unaffected) and stop all writers before destroying any in stop().

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Encapsulate the scratch isSet reset behind StructField::clearIsSet, add a
debug-only length assert in buildRecordBatch, and document that file_visitor
runs synchronously on the engine thread.

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
@arhamchopra arhamchopra marked this pull request as ready for review June 13, 2026 02:27
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>

@ptomecek ptomecek left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through the whole output path (ParquetWriter -> RecordBatchSink -> makeFileSink), the dict-basket teardown, the Python wiring, and compared everything against the old FileWriterWrapper hierarchy. This is a clean rewrite. Each of the hardening fixes is real and correctly done: the mkdir guard for bare filenames, the close-path exception safety (no double-close, all sub-writers and the stream get closed even if one throws), compression resolved through Arrow's codec API, the explicit timestamp column being honored, the single-file + dict-basket fast failure, FileExistsError on overwrite, and the stop()-stops-everything-before-destroying-anything teardown. Arrow Status/Result values are checked at every call site. I didn't find any correctness, resource, or concurrency bugs.

Two low-severity things inline, plus one parity note I couldn't attach to a line:

Nested-struct field ordering: ArrowFieldWriter::NestedStructWriter (in ArrowFieldWriter.cpp, which isn't part of this PR) orders the arrow struct's child fields by declaration order, whereas the old code used the struct's memory-layout order. It reads back fine through the new name-based reader, but the on-disk child order differs from files written by older csp, which matters for anything reading those columns positionally. A test that pins down the struct-within-struct schema shape would be worth adding.

Overall this looks good to merge. The one thing I'd sort out first is the silently-dropped column below, since it turns an error into missing data.

Comment thread cpp/csp/adapters/parquet/ParquetOutputAdapter.cpp
Comment thread cpp/csp/adapters/parquet/ParquetWriter.cpp Outdated
Comment thread csp/tests/adapters/test_parquet_output.py
Comment thread csp/tests/adapters/test_parquet_output.py
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
@arhamchopra arhamchopra requested a review from ptomecek June 17, 2026 20:33
@ptomecek

Copy link
Copy Markdown
Collaborator

Looks good to me now

ptomecek
ptomecek previously approved these changes Jun 18, 2026

@AdamGlustein AdamGlustein left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a mere human who attempted reading through this code, I find it very difficult to follow with the lambda-based approach. First, figuring out what function is actually being called is not easy, and deciphering what actually makes it into the closure of that function is even harder. It would be much simpler (in my opinion) to make these callbacks member functions and replace the lambda capture variables with member variables for these classes.

Also, I think some of the complexity here was necessitated by being backwards compatible with the original Parquet adapter. In my opinion, I think we can relax that constraint. There are a lot of overly specific things (like symbol columns, etc.) that we don't need to support and can just drop completely from the code. With a minor version bump we're not going to break any contracts, and I highly doubt some of these vestigial features are used by anyone anymore.

Comment thread cpp/csp/adapters/parquet/ParquetDictBasketOutputWriter.cpp Outdated
Comment thread cpp/csp/adapters/parquet/ParquetOutputAdapter.cpp
Comment thread cpp/csp/adapters/parquet/RecordBatchFileSink.cpp Outdated
Comment thread cpp/csp/adapters/parquet/ParquetWriter.cpp Outdated
Comment thread cpp/csp/adapters/parquet/ParquetWriter.cpp Outdated
Comment thread csp/adapters/output_adapters/parquet.py Outdated
Comment thread cpp/csp/adapters/parquet/ParquetOutputAdapter.cpp Outdated
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
@arhamchopra arhamchopra force-pushed the ac/parquet_output_adapter branch from fa752ed to 60aa0bf Compare June 25, 2026 18:09
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
@arhamchopra arhamchopra merged commit 8ff82fd into main Jun 29, 2026
25 checks passed
@arhamchopra arhamchopra deleted the ac/parquet_output_adapter branch June 29, 2026 14:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants